package cn.gupao.pojo;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * 使用flink SQL，从Kafka中读取数据，并将结果打印
 *
 * 读取Kafka时，还有将Kafka的topic、partition、offset取出来
 *
 */
public class KafkaSource2PrintSink {


    public static void main(String[] args) throws Exception{


        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

        env.enableCheckpointing(30000);
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

        //创建kafkaSource表
        tEnv.executeSql(
                "CREATE TABLE kafka_event (        \n" +
                        "  `account` VARCHAR,        \n" +
                        "  `appId` VARCHAR,          \n" +
                        "  `appVersion` VARCHAR,     \n" +
                        "  `carrier` VARCHAR,        \n" +
                        "  `deviceId` VARCHAR,       \n" +
                        "  `eventId` VARCHAR,        \n" +
                        "  `ip` VARCHAR,             \n" +
                        "  `latitude` DOUBLE,        \n" +
                        "  `longitude` DOUBLE,       \n" +
                        "  `netType` VARCHAR,        \n" +
                        "  `osName` VARCHAR,         \n" +
                        "  `osVersion` VARCHAR,      \n" +
                        "  `releaseChannel` VARCHAR, \n" +
                        "  `resolution` VARCHAR,     \n" +
                        "  `sessionId` VARCHAR,      \n" +
                        "  `timeStamp` BIGINT,         \n" +
                        "  `properties` MAP<STRING, STRING>,     \n" +
                        "  `topic` VARCHAR METADATA VIRTUAL, \n" +
                        "  `partition` BIGINT METADATA VIRTUAL,  \n" +
                        "  `offset` BIGINT METADATA VIRTUAL      \n" +
                        ") WITH (\n" +
                        "  'connector' = 'kafka',\n" +
                        "  'topic' = 'eagle-applog',\n" +
                        "  'properties.bootstrap.servers' = 'node-1.51doit.cn:9092,node-2.51doit.cn:9092,node-3.51doit.cn:9092',\n" +
                        "  'properties.group.id' = 'g008',\n" +
                        "  'scan.startup.mode' = 'earliest-offset',\n" +
                        "  'format' = 'json',\n" +
                        "  'json.ignore-parse-errors' = 'true'\n" +
                        ")"
        );


        //窗口Print的sink表（做实验的）
        tEnv.executeSql(
                "CREATE TABLE print_sink (\n" +
                        "  `id` VARCHAR ,            \n" +
                        "  `account` VARCHAR,        \n" +
                        "  `appId` VARCHAR,          \n" +
                        "  `appVersion` VARCHAR,     \n" +
                        "  `carrier` VARCHAR,        \n" +
                        "  `deviceId` VARCHAR,       \n" +
                        "  `eventId` VARCHAR,        \n" +
                        "  `ip` VARCHAR,             \n" +
                        "  `latitude` DOUBLE,        \n" +
                        "  `longitude` DOUBLE,       \n" +
                        "  `netType` VARCHAR,        \n" +
                        "  `osName` VARCHAR,         \n" +
                        "  `osVersion` VARCHAR,      \n" +
                        "  `releaseChannel` VARCHAR, \n" +
                        "  `resolution` VARCHAR,     \n" +
                        "  `sessionId` VARCHAR,      \n" +
                        "  `timeStamp` BIGINT,         \n" +
                        "  `properties` MAP<STRING, STRING>     \n" +
                        ") WITH (\n" +
                        "  'connector' = 'print'\n" +
                        ")"
        );

        TableResult tableResult = tEnv.executeSql(
                "insert into print_sink select " +
                        "concat_ws('_', topic, cast(`partition` as varchar), cast(`offset` as varchar)) as id, " +
                        " account, appId, appVersion, carrier, deviceId, eventId, ip, latitude, longitude, netType, osName, " +
                        " osVersion, releaseChannel, resolution, sessionId, `timeStamp`, properties" +
                        " from kafka_event where account is not null and deviceId is not null"
        );


        // env.execute();





    }
}
